View Javadoc

1   /*
2    * Copyright (c) 2004-2005, University Health Network.  All rights reserved. Distributed under the BSD 
3    * license (see http://opensource.org/licenses/bsd-license.php).
4    *  
5    * Created on 25-Jan-2005
6    */
7   package ca.uhn.cache.util;
8   
9   import java.util.ArrayList;
10  import java.util.Collection;
11  import java.util.Iterator;
12  
13  import EDU.oswego.cs.dl.util.concurrent.Executor;
14  import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
15  
16  import ca.uhn.cache.util.exception.MutableIteratorException;
17  
18  /***
19   * Default implementation of IMutableMergeableIterator.  
20   * 
21   * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
22   * @version $Revision: 1.1 $ updated on $Date: 2005/01/26 00:25:50 $ by $Author: bryan_tripp $
23   */
24  public class MutableMergeableIterator extends MutableIterator implements
25          IMutableMergeableIterator {
26  
27      private Collection myMergedIterators;
28      private int myInProgressMerges = 0;  
29      private Executor myExecutor;
30      
31      /***
32       * @param theExecutor to manage threading 
33       */
34      public MutableMergeableIterator(Executor theExecutor) {
35          myMergedIterators = new ArrayList(10);
36          myExecutor = theExecutor;
37      }
38      
39      /***
40       * New instance with default Executor.  
41       */
42      public MutableMergeableIterator() {
43          myMergedIterators = new ArrayList(10);
44          myExecutor = new ThreadedExecutor();
45      }
46  
47      /*** 
48       * @see ca.uhn.cache.util.IMutableMergeableIterator#merge(java.util.Iterator)
49       */
50      public void merge(final Iterator theIterator) {
51          myInProgressMerges++;
52  
53          if (theIterator instanceof IMutableIterator) {
54              myMergedIterators.add(theIterator);            
55          }
56          
57          Runnable merger = new Runnable() {
58              public void run() {
59                  doMerge(theIterator);
60              }
61          };
62          
63          thread(merger);
64      }
65      
66      private void doMerge(Iterator theIterator) {
67          try {
68              while (theIterator.hasNext()) {
69                  add(theIterator.next());
70              }                    
71          } catch (MutableIteratorException e) {
72              declareException(e);
73          }
74          
75          myInProgressMerges--;            
76      }
77      
78      /***
79       * Waits until all in-progress merges are complete, then calls super.freeze().  Note 
80       * that it's possible to continue calling add() in the mean time ... but don't, 
81       * because you never know when the merges will finish.    
82       */
83      public void freeze() {
84          //must do this in a separate thread, because we want to return right away, but we 
85          //must wait to add the marker until after in-progress merges are complete
86          Runnable freezer = new Runnable() {
87              public void run() {
88                  doFreeze();
89              }
90          };
91          
92          thread(freezer);
93      }
94      
95      private void doFreeze() {
96          //this guarantees the end marker can't be added between a call to merge() and when 
97          //the corresponding merge thread finishes
98          try {
99              while (myInProgressMerges > 0) {
100                 Thread.sleep(2);
101             }
102         } catch (InterruptedException e) {
103             //OK
104         }
105 
106         super.freeze();        
107     }
108     
109     /*** 
110      * Closes merged IMutableIterators.  Override if you need to do more here.  But don't 
111      * forget to call super.close().  
112      *  
113      * @see ca.uhn.cache.IMutableIterator#close()
114      */
115     public void close() {
116         for (Iterator it = myMergedIterators.iterator(); it.hasNext(); ) {
117             ((IMutableIterator) it.next()).close();
118         }
119     }
120 
121     private void thread(Runnable theTask) {
122         try {
123             myExecutor.execute(theTask);
124         } catch (InterruptedException e) {
125             declareException(e);
126         }
127     }
128     
129 }